// Copyright Epic Games, Inc. All Rights Reserved.

#include <zenstore/blockstore.h>

#include <zencore/except.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/timer.h>
#include <zencore/trace.h>

#include <algorithm>
#include <unordered_map>

ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_map.h>
#include <tsl/robin_set.h>
#include <gsl/gsl-lite.hpp>
ZEN_THIRD_PARTY_INCLUDES_END

#if ZEN_WITH_TESTS
#	include <zencore/compactbinarybuilder.h>
#	include <zencore/testing.h>
#	include <zencore/testutils.h>
#	include <zencore/workthreadpool.h>
#	include <random>
#endif

//////////////////////////////////////////////////////////////////////////

namespace zen {

//////////////////////////////////////////////////////////////////////////

BlockStoreFile::BlockStoreFile(const std::filesystem::path& BlockPath) : m_Path(BlockPath)
{
}

BlockStoreFile::~BlockStoreFile()
{
	m_IoBuffer = IoBuffer();
	m_File.Detach();
}

const std::filesystem::path&
BlockStoreFile::GetPath() const
{
	return m_Path;
}

void
BlockStoreFile::Open()
{
	ZEN_TRACE_CPU("BlockStoreFile::Open");
	uint32_t RetriesLeft = 3;
	m_File.Open(m_Path, BasicFile::Mode::kDelete, [&](std::error_code& Ec) {
		if (RetriesLeft == 0)
		{
			return false;
		}
		ZEN_WARN("Failed to open cas block '{}', reason: '{}', retries left: {}.", m_Path, Ec.message(), RetriesLeft);
		Sleep(100 - (3 - RetriesLeft) * 100);  // Total 600 ms
		RetriesLeft--;
		return true;
	});
	void* FileHandle = m_File.Handle();
	m_CachedFileSize = m_File.FileSize();
	m_IoBuffer		 = IoBuffer(IoBuffer::File, FileHandle, 0, m_CachedFileSize, /*IsWholeFile*/ true);
}

void
BlockStoreFile::Create(uint64_t InitialSize)
{
	ZEN_TRACE_CPU("BlockStoreFile::Create");

	auto ParentPath = m_Path.parent_path();
	if (!std::filesystem::is_directory(ParentPath))
	{
		CreateDirectories(ParentPath);
	}

	uint32_t RetriesLeft = 3;
	m_File.Open(m_Path, BasicFile::Mode::kTruncateDelete, [&](std::error_code& Ec) {
		if (RetriesLeft == 0)
		{
			return false;
		}
		ZEN_WARN("Failed to create cas block '{}', reason: '{}', retries left: {}.", m_Path, Ec.message(), RetriesLeft);
		Sleep(100 - (3 - RetriesLeft) * 100);  // Total 600 ms
		RetriesLeft--;
		return true;
	});

	void* FileHandle = m_File.Handle();

	// We map our m_IoBuffer beyond the file size as we will grow it over time and want
	// to be able to create sub-buffers of all the written range later
	m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize, false);
}

uint64_t
BlockStoreFile::FileSize()
{
	return m_CachedFileSize == 0 ? m_File.FileSize() : m_CachedFileSize;
}

void
BlockStoreFile::MarkAsDeleteOnClose()
{
	m_IoBuffer.SetDeleteOnClose(true);
}

IoBuffer
BlockStoreFile::GetChunk(uint64_t Offset, uint64_t Size)
{
	if (Offset + Size > m_IoBuffer.GetSize())
	{
		return {};
	}
	return IoBuffer(m_IoBuffer, Offset, Size);
}

void
BlockStoreFile::Read(void* Data, uint64_t Size, uint64_t FileOffset)
{
	m_File.Read(Data, Size, FileOffset);
}

void
BlockStoreFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset)
{
	ZEN_TRACE_CPU("BlockStoreFile::Write");
	m_File.Write(Data, Size, FileOffset);
}

void
BlockStoreFile::Flush()
{
	ZEN_TRACE_CPU("BlockStoreFile::Flush");
	m_File.Flush();
}

BasicFile&
BlockStoreFile::GetBasicFile()
{
	return m_File;
}

void
BlockStoreFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun)
{
	m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun));
}
bool
BlockStoreFile::IsOpen() const
{
	return !!m_IoBuffer;
}

constexpr uint64_t DefaultIterateSmallChunkWindowSize = 2 * 1024 * 1024;
constexpr uint64_t IterateSmallChunkMaxGapSize		  = 4 * 1024;

BlockStore::BlockStore()
{
}

BlockStore::~BlockStore()
{
}

void
BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t MaxBlockSize, uint64_t MaxBlockCount)
{
	ZEN_TRACE_CPU("BlockStore::Initialize");

	ZEN_ASSERT(MaxBlockSize > 0);
	ZEN_ASSERT(MaxBlockCount > 0);
	ZEN_ASSERT(IsPow2(MaxBlockCount));

	std::unordered_map<uint32_t, uint64_t> FoundBlocks;

	m_TotalSize		 = 0;
	m_BlocksBasePath = BlocksBasePath;
	m_MaxBlockSize	 = MaxBlockSize;
	m_MaxBlockCount	 = MaxBlockCount;

	if (std::filesystem::is_directory(m_BlocksBasePath))
	{
		uint32_t						   NextBlockIndex = 0;
		std::vector<std::filesystem::path> FoldersToScan;
		FoldersToScan.push_back(m_BlocksBasePath);
		size_t FolderOffset = 0;
		while (FolderOffset < FoldersToScan.size())
		{
			for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset]))
			{
				if (Entry.is_directory())
				{
					FoldersToScan.push_back(Entry.path());
					continue;
				}
				if (Entry.is_regular_file())
				{
					const std::filesystem::path Path = Entry.path();
					if (Path.extension() != GetBlockFileExtension())
					{
						continue;
					}
					std::string FileName = PathToUtf8(Path.stem());
					uint32_t	BlockIndex;
					bool		OK = ParseHexNumber(FileName, BlockIndex);
					if (!OK)
					{
						continue;
					}
					Ref<BlockStoreFile> BlockFile{new BlockStoreFile(Path)};
					BlockFile->Open();
					m_TotalSize.fetch_add(BlockFile->FileSize(), std::memory_order::relaxed);
					m_ChunkBlocks[BlockIndex] = BlockFile;
					FoundBlocks[BlockIndex]	  = BlockFile->FileSize();
					if (BlockIndex >= NextBlockIndex)
					{
						NextBlockIndex = (BlockIndex + 1) & (m_MaxBlockCount - 1);
					}
				}
			}
			++FolderOffset;
		}
		m_WriteBlockIndex.store(NextBlockIndex, std::memory_order_release);
	}
	else
	{
		CreateDirectories(m_BlocksBasePath);
	}
}

void
BlockStore::BlockIndexSet::Add(uint32_t BlockIndex)
{
	if (!std::binary_search(begin(BlockIndexes), end(BlockIndexes), BlockIndex))
	{
		auto It = std::lower_bound(begin(BlockIndexes), end(BlockIndexes), BlockIndex);
		BlockIndexes.insert(It, BlockIndex);
	}
}

void
BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownLocations)
{
	ZEN_TRACE_CPU("BlockStore::SyncExistingBlocksOnDisk");

	RwLock::ExclusiveLockScope InsertLock(m_InsertLock);

	tsl::robin_set<uint32_t> MissingBlocks;
	tsl::robin_set<uint32_t> DeleteBlocks;
	DeleteBlocks.reserve(m_ChunkBlocks.size());
	for (auto It : m_ChunkBlocks)
	{
		DeleteBlocks.insert(It.first);
	}

	for (const uint32_t BlockIndex : KnownLocations.GetBlockIndices())
	{
		DeleteBlocks.erase(BlockIndex);
		if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end() && !It->second.IsNull())
		{
			continue;
		}
		else
		{
			MissingBlocks.insert(BlockIndex);
		}
	}
	for (std::uint32_t BlockIndex : MissingBlocks)
	{
		std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockIndex);
		Ref<BlockStoreFile>	  NewBlockFile(new BlockStoreFile(BlockPath));
		NewBlockFile->Create(0);
		m_ChunkBlocks[BlockIndex] = NewBlockFile;
	}
	for (std::uint32_t BlockIndex : DeleteBlocks)
	{
		std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockIndex);
		if (m_ChunkBlocks[BlockIndex])
		{
			m_TotalSize.fetch_sub(m_ChunkBlocks[BlockIndex]->FileSize(), std::memory_order::relaxed);
			m_ChunkBlocks[BlockIndex]->MarkAsDeleteOnClose();
		}
		m_ChunkBlocks.erase(BlockIndex);
	}
}

BlockStore::BlockEntryCountMap
BlockStore::GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUsageThresholdPercent)
{
	ZEN_TRACE_CPU("BlockStoreFile::GetBlocksToCompact");
	BlockEntryCountMap Result;
	{
		RwLock::SharedLockScope InsertLock(m_InsertLock);
		for (const auto& It : m_ChunkBlocks)
		{
			uint32_t BlockIndex = It.first;
			if ((BlockIndex == m_WriteBlockIndex.load()) && m_WriteBlock)
			{
				continue;
			}
			if (std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), BlockIndex) != m_ActiveWriteBlocks.end())
			{
				continue;
			}

			uint64_t UsedSize  = 0;
			uint32_t UsedCount = 0;
			if (auto UsageIt = BlockUsage.find(BlockIndex); UsageIt != BlockUsage.end())
			{
				UsedSize  = UsageIt->second.DiskUsage;
				UsedCount = UsageIt->second.EntryCount;
			}

			uint64_t BlockSize = It.second ? It.second->FileSize() : 0u;
			if (BlockSize == 0)
			{
				Result.insert_or_assign(BlockIndex, UsedCount);
				continue;
			}

			if (BlockUsageThresholdPercent == 100)
			{
				if (UsedSize < BlockSize)
				{
					Result.insert_or_assign(BlockIndex, UsedCount);
				}
			}
			else if (BlockUsageThresholdPercent == 0)
			{
				if (UsedSize == 0)
				{
					Result.insert_or_assign(BlockIndex, UsedCount);
				}
			}
			else
			{
				const uint32_t UsedPercent = UsedSize < BlockSize ? gsl::narrow<uint32_t>((100 * UsedSize) / BlockSize) : 100u;
				if (UsedPercent < BlockUsageThresholdPercent)
				{
					Result.insert_or_assign(BlockIndex, UsedCount);
				}
			}
		}
	}
	return Result;
}

void
BlockStore::Close()
{
	ZEN_TRACE_CPU("BlockStore::Close");
	RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
	m_WriteBlock		  = nullptr;
	m_CurrentInsertOffset = 0;
	m_WriteBlockIndex	  = 0;

	m_ChunkBlocks.clear();
	m_BlocksBasePath.clear();
}

uint32_t
BlockStore::GetFreeBlockIndex(uint32_t ProbeIndex, RwLock::ExclusiveLockScope&, std::filesystem::path& OutBlockPath) const
{
	if (m_ChunkBlocks.size() == m_MaxBlockCount)
	{
		return (uint32_t)m_MaxBlockCount;
	}

	while (true)
	{
		if (!m_ChunkBlocks.contains(ProbeIndex))
		{
			OutBlockPath = GetBlockPath(m_BlocksBasePath, ProbeIndex);
			std::error_code Ec;
			bool			Exists = std::filesystem::exists(OutBlockPath, Ec);
			if (Ec)
			{
				ZEN_WARN("Failed to probe existence of file '{}' when trying to allocate a new block. Reason: '{}'",
						 OutBlockPath,
						 Ec.message());
				return (uint32_t)m_MaxBlockCount;
			}
			if (!Exists)
			{
				return ProbeIndex;
			}
		}
		ProbeIndex = (ProbeIndex + 1) & (m_MaxBlockCount - 1);
	}
	return ProbeIndex;
}

void
BlockStore::WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, const WriteChunkCallback& Callback)
{
	ZEN_TRACE_CPU("BlockStore::WriteChunk");

	ZEN_ASSERT(Data != nullptr);
	ZEN_ASSERT(Size > 0u);
	ZEN_ASSERT(Size <= m_MaxBlockSize);
	ZEN_ASSERT(Alignment > 0u);

	uint32_t ChunkSize = gsl::narrow<uint32_t>(Size);

	RwLock::ExclusiveLockScope InsertLock(m_InsertLock);

	uint32_t WriteBlockIndex	 = m_WriteBlockIndex.load(std::memory_order_acquire);
	bool	 IsWriting			 = !!m_WriteBlock;
	uint32_t AlignedInsertOffset = RoundUp(m_CurrentInsertOffset, Alignment);
	if (!IsWriting || (AlignedInsertOffset + ChunkSize) > m_MaxBlockSize)
	{
		if (m_WriteBlock)
		{
			m_WriteBlock->Flush();
			m_WriteBlock = nullptr;
		}

		WriteBlockIndex += IsWriting ? 1 : 0;
		std::filesystem::path BlockPath;
		WriteBlockIndex = GetFreeBlockIndex(WriteBlockIndex, InsertLock, BlockPath);
		if (WriteBlockIndex == (uint32_t)m_MaxBlockCount)
		{
			throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath));
		}

		Ref<BlockStoreFile> NewBlockFile(new BlockStoreFile(BlockPath));
		NewBlockFile->Create(m_MaxBlockSize);

		m_ChunkBlocks[WriteBlockIndex] = NewBlockFile;
		m_WriteBlock				   = NewBlockFile;
		m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
		m_CurrentInsertOffset = 0;
		AlignedInsertOffset	  = 0;
	}
	uint32_t AlignedWriteSize	   = AlignedInsertOffset - m_CurrentInsertOffset + ChunkSize;
	m_CurrentInsertOffset		   = AlignedInsertOffset + ChunkSize;
	Ref<BlockStoreFile> WriteBlock = m_WriteBlock;
	m_ActiveWriteBlocks.push_back(WriteBlockIndex);
	InsertLock.ReleaseNow();

	WriteBlock->Write(Data, ChunkSize, AlignedInsertOffset);
	m_TotalSize.fetch_add(AlignedWriteSize, std::memory_order::relaxed);

	Callback({.BlockIndex = WriteBlockIndex, .Offset = AlignedInsertOffset, .Size = ChunkSize});

	{
		RwLock::ExclusiveLockScope _(m_InsertLock);
		m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex));
	}
}

void
BlockStore::WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback)
{
	ZEN_TRACE_CPU("BlockStore::WriteChunks");

	ZEN_ASSERT(!Datas.empty());
	ZEN_ASSERT(Alignment > 0u);

	const size_t TotalCount	 = Datas.size();
	uint64_t	 TotalSize	 = 0;
	uint64_t	 LargestSize = 0;
	for (const IoBuffer& Data : Datas)
	{
		uint64_t Size = Data.GetSize();
		ZEN_ASSERT(Size > 0);
		ZEN_ASSERT(Size <= m_MaxBlockSize);
		TotalSize += Size;
		LargestSize = Max(LargestSize, Size);
	}

	const uint64_t		 MinSize	= Max(LargestSize, 8u * 1024u * 1024u);
	const uint64_t		 BufferSize = Min(TotalSize, MinSize);
	std::vector<uint8_t> Buffer(BufferSize);

	size_t Offset = 0;
	while (Offset < TotalCount)
	{
		size_t	 Count	   = 1;
		uint32_t RangeSize = gsl::narrow<uint32_t>(Datas[Offset].GetSize());

		RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
		uint32_t				   WriteBlockIndex	   = m_WriteBlockIndex.load(std::memory_order_acquire);
		uint32_t				   AlignedInsertOffset = RoundUp(m_CurrentInsertOffset, Alignment);
		if ((!m_WriteBlock) || ((AlignedInsertOffset + RangeSize) > m_MaxBlockSize))
		{
			std::filesystem::path BlockPath;
			WriteBlockIndex = GetFreeBlockIndex(WriteBlockIndex, InsertLock, BlockPath);
			if (WriteBlockIndex == (uint32_t)m_MaxBlockCount)
			{
				throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath));
			}
			Ref<BlockStoreFile> NewBlockFile(new BlockStoreFile(BlockPath));
			NewBlockFile->Create(m_MaxBlockSize);

			m_ChunkBlocks[WriteBlockIndex] = NewBlockFile;
			m_WriteBlock				   = NewBlockFile;
			m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
			m_CurrentInsertOffset = 0;
			AlignedInsertOffset	  = 0;
		}
		while (Offset + Count < TotalCount)
		{
			uint32_t NextRangeSize = gsl::narrow<uint32_t>(RoundUp(RangeSize, Alignment) + Datas[Offset + Count].GetSize());
			if ((AlignedInsertOffset + NextRangeSize) > m_MaxBlockSize || (NextRangeSize > BufferSize))
			{
				break;
			}
			Count++;
			RangeSize = NextRangeSize;
		}
		m_CurrentInsertOffset		   = AlignedInsertOffset + RangeSize;
		Ref<BlockStoreFile> WriteBlock = m_WriteBlock;
		m_ActiveWriteBlocks.push_back(WriteBlockIndex);
		InsertLock.ReleaseNow();

		{
			MutableMemoryView WriteBuffer(Buffer.data(), RangeSize);
			for (size_t Index = 0; Index < Count; Index++)
			{
				MemoryView SourceBuffer = Datas[Index + Offset];
				WriteBuffer.CopyFrom(SourceBuffer);
				WriteBuffer.MidInline(RoundUp(SourceBuffer.GetSize(), Alignment));
			}
			WriteBlock->Write(Buffer.data(), RangeSize, AlignedInsertOffset);
		}

		m_TotalSize.fetch_add(RangeSize, std::memory_order::relaxed);

		uint32_t						ChunkOffset = AlignedInsertOffset;
		std::vector<BlockStoreLocation> Locations(Count);
		for (size_t Index = 0; Index < Count; Index++)
		{
			uint32_t ChunkSize = gsl::narrow<uint32_t>(Datas[Offset + Index].GetSize());
			Locations[Index]   = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset, .Size = ChunkSize};
			ChunkOffset += gsl::narrow<uint32_t>(RoundUp(ChunkSize, Alignment));
		}
		Callback(Locations);

		{
			RwLock::ExclusiveLockScope _(m_InsertLock);
			m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex));
		}

		Offset += Count;
	}
}

BlockStore::ReclaimSnapshotState
BlockStore::GetReclaimSnapshotState()
{
	ReclaimSnapshotState	State;
	RwLock::SharedLockScope _(m_InsertLock);
	for (uint32_t BlockIndex : m_ActiveWriteBlocks)
	{
		State.m_ActiveWriteBlocks.insert(BlockIndex);
	}
	if (m_WriteBlock)
	{
		State.m_ActiveWriteBlocks.insert(m_WriteBlockIndex);
	}
	for (auto It : m_ChunkBlocks)
	{
		State.m_BlockIndexes.insert(It.first);
	}
	return State;
}

IoBuffer
BlockStore::TryGetChunk(const BlockStoreLocation& Location) const
{
	ZEN_TRACE_CPU("BlockStore::TryGetChunk");
	RwLock::SharedLockScope InsertLock(m_InsertLock);
	if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end())
	{
		if (const Ref<BlockStoreFile>& Block = BlockIt->second; Block)
		{
			IoBuffer Chunk = Block->GetChunk(Location.Offset, Location.Size);
			if (Chunk.GetSize() == Location.Size)
			{
				return Chunk;
			}
		}
	}
	return IoBuffer();
}

void
BlockStore::Flush(bool ForceNewBlock)
{
	ZEN_TRACE_CPU("BlockStore::Flush");

	if (ForceNewBlock)
	{
		RwLock::ExclusiveLockScope _(m_InsertLock);
		if (m_CurrentInsertOffset > 0)
		{
			if (m_WriteBlock)
			{
				m_WriteBlock->Flush();
			}
			m_WriteBlock		  = nullptr;
			m_CurrentInsertOffset = 0;
		}
		return;
	}
	RwLock::SharedLockScope _(m_InsertLock);
	if (m_WriteBlock)
	{
		m_WriteBlock->Flush();
	}
}

void
BlockStore::ReclaimSpace(const ReclaimSnapshotState&			Snapshot,
						 const std::vector<BlockStoreLocation>& ChunkLocations,
						 const ChunkIndexArray&					KeepChunkIndexes,
						 uint32_t								PayloadAlignment,
						 bool									DryRun,
						 const ReclaimCallback&					ChangeCallback,
						 const ClaimDiskReserveCallback&		DiskReserveCallback)
{
	ZEN_TRACE_CPU("BlockStore::ReclaimSpace");

	uint64_t WriteBlockTimeUs		 = 0;
	uint64_t WriteBlockLongestTimeUs = 0;
	uint64_t ReadBlockTimeUs		 = 0;
	uint64_t ReadBlockLongestTimeUs	 = 0;
	uint64_t TotalChunkCount		 = ChunkLocations.size();
	uint64_t DeletedSize			 = 0;
	uint64_t OldTotalSize			 = 0;
	uint64_t NewTotalSize			 = 0;

	uint64_t MovedCount	  = 0;
	uint64_t DeletedCount = 0;

	Stopwatch  TotalTimer;
	const auto _ = MakeGuard([&] {
		ZEN_DEBUG(
			"reclaim space for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved "
			"{} "
			"of {} "
			"chunks ({}).",
			m_BlocksBasePath,
			NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
			NiceLatencyNs(WriteBlockTimeUs),
			NiceLatencyNs(WriteBlockLongestTimeUs),
			NiceLatencyNs(ReadBlockTimeUs),
			NiceLatencyNs(ReadBlockLongestTimeUs),
			NiceBytes(DeletedSize),
			DeletedCount,
			MovedCount,
			TotalChunkCount,
			NiceBytes(OldTotalSize));
	});

	size_t BlockCount = Snapshot.m_BlockIndexes.size();
	if (BlockCount == 0)
	{
		ZEN_DEBUG("garbage collect for '{}' SKIPPED, no blocks to process", m_BlocksBasePath);
		return;
	}

	tsl::robin_set<size_t> KeepChunkMap;
	KeepChunkMap.reserve(KeepChunkIndexes.size());
	for (size_t KeepChunkIndex : KeepChunkIndexes)
	{
		KeepChunkMap.insert(KeepChunkIndex);
	}

	tsl::robin_map<uint32_t, size_t> BlockIndexToChunkMapIndex;
	std::vector<ChunkIndexArray>	 BlockKeepChunks;
	std::vector<ChunkIndexArray>	 BlockDeleteChunks;

	BlockIndexToChunkMapIndex.reserve(BlockCount);
	BlockKeepChunks.reserve(BlockCount);
	BlockDeleteChunks.reserve(BlockCount);
	size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2;

	size_t DeleteCount = 0;
	for (size_t Index = 0; Index < TotalChunkCount; ++Index)
	{
		const BlockStoreLocation& Location = ChunkLocations[Index];
		if (!Snapshot.m_BlockIndexes.contains(Location.BlockIndex))
		{
			// We did not know about the block when we took the snapshot, don't touch it
			continue;
		}
		OldTotalSize += Location.Size;
		auto   BlockIndexPtr = BlockIndexToChunkMapIndex.find(Location.BlockIndex);
		size_t ChunkMapIndex = 0;
		if (BlockIndexPtr == BlockIndexToChunkMapIndex.end())
		{
			ChunkMapIndex								   = BlockKeepChunks.size();
			BlockIndexToChunkMapIndex[Location.BlockIndex] = ChunkMapIndex;
			BlockKeepChunks.resize(ChunkMapIndex + 1);
			BlockKeepChunks.back().reserve(GuesstimateCountPerBlock);
			BlockDeleteChunks.resize(ChunkMapIndex + 1);
			BlockDeleteChunks.back().reserve(GuesstimateCountPerBlock);
		}
		else
		{
			ChunkMapIndex = BlockIndexPtr->second;
		}

		if (KeepChunkMap.contains(Index))
		{
			ChunkIndexArray& IndexMap = BlockKeepChunks[ChunkMapIndex];
			IndexMap.push_back(Index);
			NewTotalSize += Location.Size;
			continue;
		}
		ChunkIndexArray& IndexMap = BlockDeleteChunks[ChunkMapIndex];
		IndexMap.push_back(Index);
		DeleteCount++;
	}

	std::vector<uint32_t> BlocksToReWrite;
	BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size());
	for (const auto& Entry : BlockIndexToChunkMapIndex)
	{
		uint32_t			   BlockIndex	 = Entry.first;
		size_t				   ChunkMapIndex = Entry.second;
		const ChunkIndexArray& ChunkMap		 = BlockDeleteChunks[ChunkMapIndex];
		if (ChunkMap.empty())
		{
			continue;
		}
		BlocksToReWrite.push_back(BlockIndex);
	}

	{
		// Any known block not referenced should be added as well
		RwLock::SharedLockScope __(m_InsertLock);
		for (std::uint32_t BlockIndex : Snapshot.m_BlockIndexes)
		{
			if (!m_ChunkBlocks.contains(BlockIndex))
			{
				continue;
			}
			bool WasActiveWriteBlock = Snapshot.m_ActiveWriteBlocks.contains(BlockIndex);
			if (WasActiveWriteBlock)
			{
				continue;
			}
			if (BlockIndexToChunkMapIndex.contains(BlockIndex))
			{
				continue;
			}
			size_t ChunkMapIndex = ChunkMapIndex  = BlockKeepChunks.size();
			BlockIndexToChunkMapIndex[BlockIndex] = ChunkMapIndex;
			BlockKeepChunks.resize(ChunkMapIndex + 1);
			BlockDeleteChunks.resize(ChunkMapIndex + 1);
			BlocksToReWrite.push_back(BlockIndex);
		}
	}

	if (DryRun)
	{
		ZEN_DEBUG("garbage collect for '{}' DISABLED, found {} {} chunks of total {} {}",
				  m_BlocksBasePath,
				  DeleteCount,
				  NiceBytes(OldTotalSize - NewTotalSize),
				  TotalChunkCount,
				  OldTotalSize);
		return;
	}

	try
	{
		ZEN_TRACE_CPU("BlockStore::ReclaimSpace::Compact");
		Ref<BlockStoreFile> NewBlockFile;
		auto				NewBlockFileGuard = MakeGuard([&]() {
			   if (NewBlockFile && NewBlockFile->IsOpen())
			   {
				   ZEN_DEBUG("dropping incomplete cas block store file '{}'", NewBlockFile->GetPath());
				   m_TotalSize.fetch_sub(NewBlockFile->FileSize(), std::memory_order::relaxed);
				   NewBlockFile->MarkAsDeleteOnClose();
			   }
		   });

		uint64_t WriteOffset   = 0;
		uint32_t NewBlockIndex = 0;
		for (uint32_t BlockIndex : BlocksToReWrite)
		{
			bool IsActiveWriteBlock = Snapshot.m_ActiveWriteBlocks.contains(BlockIndex);

			const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex];

			Ref<BlockStoreFile> OldBlockFile;
			if (!IsActiveWriteBlock)
			{
				RwLock::SharedLockScope _i(m_InsertLock);
				Stopwatch				Timer;
				const auto				__ = MakeGuard([&] {
					 uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
					 WriteBlockTimeUs += ElapsedUs;
					 WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
				 });
				if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end())
				{
					OldBlockFile = It->second;
				}
			}

			ChunkIndexArray& KeepMap = BlockKeepChunks[ChunkMapIndex];
			if (KeepMap.empty())
			{
				ZEN_TRACE_CPU("BlockStore::ReclaimSpace::DeleteBlock");

				const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex];
				for (size_t DeleteIndex : DeleteMap)
				{
					DeletedSize += ChunkLocations[DeleteIndex].Size;
				}
				ChangeCallback({}, DeleteMap);
				DeletedCount += DeleteMap.size();
				if (OldBlockFile)
				{
					RwLock::ExclusiveLockScope _i(m_InsertLock);
					Stopwatch				   Timer;
					const auto				   __ = MakeGuard([&] {
						uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
						ReadBlockTimeUs += ElapsedUs;
						ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
					});
					ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
					ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile);
					m_ChunkBlocks.erase(BlockIndex);
					m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed);
					OldBlockFile->MarkAsDeleteOnClose();
				}
				continue;
			}
			else if (!OldBlockFile && !IsActiveWriteBlock)
			{
				// If the block file pointed to does not exist, move any keep chunk them to deleted list
				ZEN_ERROR("Expected to find block {} in {} - this should never happen, marking {} entries as deleted.",
						  BlockIndex,
						  m_BlocksBasePath,
						  KeepMap.size());

				BlockDeleteChunks[ChunkMapIndex].insert(BlockDeleteChunks[ChunkMapIndex].end(), KeepMap.begin(), KeepMap.end());
				KeepMap.clear();
			}
			else if (OldBlockFile && (OldBlockFile->FileSize() == 0))
			{
				// Block created to accommodate missing blocks
				ZEN_WARN("Missing block {} in {} - backing data for locations is missing, marking {} entries as deleted.",
						 BlockIndex,
						 m_BlocksBasePath,
						 KeepMap.size());

				BlockDeleteChunks[ChunkMapIndex].insert(BlockDeleteChunks[ChunkMapIndex].end(), KeepMap.begin(), KeepMap.end());
				KeepMap.clear();
			}

			MovedChunksArray MovedChunks;
			if (OldBlockFile)
			{
				ZEN_TRACE_CPU("BlockStore::ReclaimSpace::MoveBlock");

				ZEN_INFO("Moving {} chunks from '{}' to new block", KeepMap.size(), GetBlockPath(m_BlocksBasePath, BlockIndex));

				uint64_t			 OldBlockSize = OldBlockFile->FileSize();
				std::vector<uint8_t> Chunk;
				for (const size_t& ChunkIndex : KeepMap)
				{
					const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex];
					if (ChunkLocation.Offset + ChunkLocation.Size > OldBlockSize)
					{
						ZEN_WARN(
							"ReclaimSpace skipping chunk outside of block range in '{}', Chunk start {}, Chunk size {} in Block {}, Block "
							"size {}",
							m_BlocksBasePath,
							ChunkLocation.Offset,
							ChunkLocation.Size,
							OldBlockFile->GetPath(),
							OldBlockSize);
						continue;
					}
					Chunk.resize(ChunkLocation.Size);
					OldBlockFile->Read(Chunk.data(), ChunkLocation.Size, ChunkLocation.Offset);

					if (!NewBlockFile || (WriteOffset + ChunkLocation.Size > m_MaxBlockSize))
					{
						uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed);

						if (NewBlockFile)
						{
							ZEN_ASSERT_SLOW(NewBlockFile->IsOpen());
							NewBlockFile->Flush();
							NewBlockFile = nullptr;
						}
						{
							ChangeCallback(MovedChunks, {});
							MovedCount += KeepMap.size();
							MovedChunks.clear();
							RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
							Stopwatch				   Timer;
							const auto				   ___ = MakeGuard([&] {
								uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
								ReadBlockTimeUs += ElapsedUs;
								ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
							});

							std::filesystem::path NewBlockPath;
							NextBlockIndex = GetFreeBlockIndex(NextBlockIndex, InsertLock, NewBlockPath);
							if (NextBlockIndex == (uint32_t)m_MaxBlockCount)
							{
								ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded",
										  m_BlocksBasePath,
										  static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
								return;
							}

							NewBlockFile				  = new BlockStoreFile(NewBlockPath);
							m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
						}

						std::error_code Error;
						DiskSpace		Space = DiskSpaceInfo(m_BlocksBasePath, Error);
						if (Error)
						{
							ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message());
							return;
						}
						if (Space.Free < m_MaxBlockSize)
						{
							uint64_t ReclaimedSpace = DiskReserveCallback();
							if (Space.Free + ReclaimedSpace < m_MaxBlockSize)
							{
								ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}",
										 m_BlocksBasePath,
										 m_MaxBlockSize,
										 NiceBytes(Space.Free + ReclaimedSpace));
								RwLock::ExclusiveLockScope _l(m_InsertLock);
								Stopwatch				   Timer;
								const auto				   __ = MakeGuard([&] {
									uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
									ReadBlockTimeUs += ElapsedUs;
									ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
								});
								ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile);
								ZEN_ASSERT_SLOW(!NewBlockFile->IsOpen());
								m_ChunkBlocks.erase(NextBlockIndex);
								return;
							}

							ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}",
									 m_BlocksBasePath,
									 ReclaimedSpace,
									 NiceBytes(Space.Free + ReclaimedSpace));
						}
						NewBlockFile->Create(m_MaxBlockSize);
						NewBlockIndex = NextBlockIndex;
						WriteOffset	  = 0;
					}

					NewBlockFile->Write(Chunk.data(), ChunkLocation.Size, WriteOffset);
					MovedChunks.push_back(
						{ChunkIndex,
						 {.BlockIndex = NewBlockIndex, .Offset = gsl::narrow<uint32_t>(WriteOffset), .Size = ChunkLocation.Size}});
					uint64_t OldOffset = WriteOffset;
					WriteOffset		   = RoundUp(WriteOffset + ChunkLocation.Size, PayloadAlignment);
					m_TotalSize.fetch_add(WriteOffset - OldOffset, std::memory_order::relaxed);
				}
				Chunk.clear();
				if (NewBlockFile)
				{
					ZEN_ASSERT_SLOW(NewBlockFile->IsOpen());
					NewBlockFile->Flush();
				}
			}

			const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex];
			for (size_t DeleteIndex : DeleteMap)
			{
				DeletedSize += ChunkLocations[DeleteIndex].Size;
			}

			ChangeCallback(MovedChunks, DeleteMap);
			MovedCount += MovedChunks.size();
			DeletedCount += DeleteMap.size();
			MovedChunks.clear();

			if (OldBlockFile)
			{
				RwLock::ExclusiveLockScope __(m_InsertLock);
				Stopwatch				   Timer;
				const auto				   ___ = MakeGuard([&] {
					uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
					ReadBlockTimeUs += ElapsedUs;
					ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
				});

				ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
				ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile);
				m_ChunkBlocks.erase(BlockIndex);
				m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed);
				OldBlockFile->MarkAsDeleteOnClose();
			}
		}
		if (NewBlockFile)
		{
			ZEN_ASSERT_SLOW(NewBlockFile->IsOpen());
			NewBlockFile->Flush();
			NewBlockFile = nullptr;
		}
	}
	catch (const std::system_error& SystemError)
	{
		if (IsOOM(SystemError.code()))
		{
			ZEN_WARN("reclaiming space for '{}' ran out of memory: '{}'", m_BlocksBasePath, SystemError.what());
		}
		else if (IsOOD(SystemError.code()))
		{
			ZEN_WARN("reclaiming space for '{}' ran out of disk space: '{}'", m_BlocksBasePath, SystemError.what());
		}
		else
		{
			ZEN_ERROR("reclaiming space for '{}' failed with system error exception: '{}'", m_BlocksBasePath, SystemError.what());
		}
	}
	catch (const std::bad_alloc& BadAlloc)
	{
		ZEN_WARN("reclaiming space for '{}' ran out of memory: '{}'", m_BlocksBasePath, BadAlloc.what());
	}
	catch (const std::exception& ex)
	{
		ZEN_ERROR("reclaiming space for '{}' failed with: '{}'", m_BlocksBasePath, ex.what());
	}
}

bool
BlockStore::IterateBlock(std::span<const BlockStoreLocation>   ChunkLocations,
						 std::span<const size_t>			   InChunkIndexes,
						 const IterateChunksSmallSizeCallback& SmallSizeCallback,
						 const IterateChunksLargeSizeCallback& LargeSizeCallback,
						 uint64_t							   LargeSizeLimit)
{
	if (InChunkIndexes.empty())
	{
		return true;
	}

	uint64_t IterateSmallChunkWindowSize = Max(DefaultIterateSmallChunkWindowSize, LargeSizeLimit);
	if (LargeSizeLimit == 0u)
	{
		LargeSizeLimit = IterateSmallChunkWindowSize;
	}
	else
	{
		IterateSmallChunkWindowSize =
			Min((LargeSizeLimit + IterateSmallChunkMaxGapSize) * ChunkLocations.size(), IterateSmallChunkWindowSize);
	}

	uint32_t			BlockIndex = ChunkLocations[InChunkIndexes[0]].BlockIndex;
	std::vector<size_t> ChunkIndexes(InChunkIndexes.begin(), InChunkIndexes.end());
	std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool {
		return ChunkLocations[IndexA].Offset < ChunkLocations[IndexB].Offset;
	});

	auto GetNextRange = [LargeSizeLimit, IterateSmallChunkWindowSize, &ChunkLocations](std::span<const size_t> ChunkIndexes,
																					   size_t				   StartIndexOffset) -> size_t {
		size_t					  ChunkCount	= 0;
		size_t					  StartIndex	= ChunkIndexes[StartIndexOffset];
		const BlockStoreLocation& StartLocation = ChunkLocations[StartIndex];
		uint64_t				  StartOffset	= StartLocation.Offset;
		uint64_t				  LastEnd		= StartOffset + StartLocation.Size;
		while (StartIndexOffset + ChunkCount < ChunkIndexes.size())
		{
			size_t					  NextIndex = ChunkIndexes[StartIndexOffset + ChunkCount];
			const BlockStoreLocation& Location	= ChunkLocations[NextIndex];
			ZEN_ASSERT(Location.BlockIndex == StartLocation.BlockIndex);
			if (Location.Size > LargeSizeLimit)
			{
				break;
			}
			if (Location.Offset >= (LastEnd + IterateSmallChunkMaxGapSize))
			{
				break;
			}
			if ((Location.Offset + Location.Size) - StartOffset > IterateSmallChunkWindowSize)
			{
				break;
			}
			LastEnd = Location.Offset + Location.Size;
			++ChunkCount;
		}
		return ChunkCount;
	};

	RwLock::SharedLockScope InsertLock(m_InsertLock);
	auto					FindBlockIt = m_ChunkBlocks.find(BlockIndex);
	if (FindBlockIt == m_ChunkBlocks.end())
	{
		InsertLock.ReleaseNow();

		ZEN_LOG_SCOPE("block #{} not available", BlockIndex);

		for (size_t ChunkIndex : ChunkIndexes)
		{
			if (!SmallSizeCallback(ChunkIndex, nullptr, 0))
			{
				return false;
			}
		}
	}
	else
	{
		Ref<BlockStoreFile> BlockFile = FindBlockIt->second;
		ZEN_ASSERT(BlockFile);
		InsertLock.ReleaseNow();

		IoBuffer ReadBuffer{IterateSmallChunkWindowSize};
		void*	 BufferBase = ReadBuffer.MutableData();

		size_t LocationIndexOffset = 0;
		while (LocationIndexOffset < ChunkIndexes.size())
		{
			size_t					  ChunkIndex	= ChunkIndexes[LocationIndexOffset];
			const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex];

			const size_t BlockSize	= BlockFile->FileSize();
			const size_t RangeCount = GetNextRange(ChunkIndexes, LocationIndexOffset);
			if (RangeCount > 0)
			{
				size_t					  LastChunkIndex = ChunkIndexes[LocationIndexOffset + RangeCount - 1];
				const BlockStoreLocation& LastLocation	 = ChunkLocations[LastChunkIndex];
				uint64_t				  Size			 = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset;
				BlockFile->Read(BufferBase, Size, FirstLocation.Offset);
				for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex)
				{
					size_t					  NextChunkIndex = ChunkIndexes[LocationIndexOffset + RangeIndex];
					const BlockStoreLocation& ChunkLocation	 = ChunkLocations[NextChunkIndex];
					if (ChunkLocation.Size == 0 || ((ChunkLocation.Offset + ChunkLocation.Size) > BlockSize))
					{
						ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})",
									  ChunkLocation.Offset,
									  ChunkLocation.Size,
									  BlockIndex,
									  BlockSize);

						if (!SmallSizeCallback(NextChunkIndex, nullptr, 0))
						{
							return false;
						}
						continue;
					}
					void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset];
					if (!SmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size))
					{
						return false;
					}
				}
				LocationIndexOffset += RangeCount;
				continue;
			}
			if (FirstLocation.Size == 0 || (FirstLocation.Offset + FirstLocation.Size > BlockSize))
			{
				ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})",
							  FirstLocation.Offset,
							  FirstLocation.Size,
							  BlockIndex,
							  BlockSize);

				SmallSizeCallback(ChunkIndex, nullptr, 0);
				LocationIndexOffset++;
				continue;
			}
			if (!LargeSizeCallback(ChunkIndex, *BlockFile.Get(), FirstLocation.Offset, FirstLocation.Size))
			{
				return false;
			}
			LocationIndexOffset++;
		}
	}
	return true;
}

bool
BlockStore::IterateChunks(const std::span<const BlockStoreLocation>& ChunkLocations, const IterateChunksCallback& Callback)
{
	ZEN_TRACE_CPU("BlockStore::IterateChunks");

	Stopwatch Timer;
	auto	  _ = MakeGuard([&]() {
		 ZEN_DEBUG("Iterated {} chunks from '{}' in {}", ChunkLocations.size(), m_BlocksBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
	 });

	ZEN_LOG_SCOPE("iterating chunks from '{}'", m_BlocksBasePath);

	std::unordered_map<uint32_t, size_t> BlockIndexToBlockChunks;
	std::vector<std::vector<size_t>>	 BlocksChunks;

	for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex)
	{
		const BlockStoreLocation& Location = ChunkLocations[ChunkIndex];
		if (auto It = BlockIndexToBlockChunks.find(Location.BlockIndex); It != BlockIndexToBlockChunks.end())
		{
			BlocksChunks[It->second].push_back(ChunkIndex);
		}
		else
		{
			BlockIndexToBlockChunks.insert(std::make_pair(Location.BlockIndex, BlocksChunks.size()));
			BlocksChunks.push_back(std::vector<size_t>({ChunkIndex}));
		}
	}

	for (auto& BlockChunks : BlocksChunks)
	{
		ZEN_ASSERT(!BlockChunks.empty());
		uint32_t BlockIndex = ChunkLocations[BlockChunks[0]].BlockIndex;
		if (!Callback(BlockIndex, BlockChunks))
		{
			return false;
		}
	}
	return true;
}

void
BlockStore::CompactBlocks(const BlockStoreCompactState&	  CompactState,
						  uint32_t						  PayloadAlignment,
						  const CompactCallback&		  ChangeCallback,
						  const ClaimDiskReserveCallback& DiskReserveCallback,
						  std::string_view				  LogPrefix)
{
	ZEN_TRACE_CPU("BlockStore::CompactBlocks");

	uint64_t DeletedSize = 0;
	uint64_t MovedCount	 = 0;
	uint64_t MovedSize	 = 0;

	Stopwatch  TotalTimer;
	const auto _ = MakeGuard([&] {
		ZEN_DEBUG("{}Compact blocks for '{}' DONE after {}, deleted {} and moved {} chunks ({}) ",
				  LogPrefix,
				  m_BlocksBasePath,
				  NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
				  NiceBytes(DeletedSize),
				  MovedCount,
				  NiceBytes(MovedSize));
	});

	uint32_t		 NewBlockIndex = 0;
	MovedChunksArray MovedChunks;

	uint64_t AddedSize	 = 0;
	uint64_t RemovedSize = 0;

	Ref<BlockStoreFile>				 NewBlockFile;
	std::unique_ptr<BasicFileWriter> TargetFileBuffer;
	auto							 NewBlockFileGuard = MakeGuard([&]() {
		TargetFileBuffer.reset();
		if (NewBlockFile)
		{
			{
				RwLock::ExclusiveLockScope _l(m_InsertLock);
				if (m_ChunkBlocks[NewBlockIndex] == NewBlockFile)
				{
					m_ChunkBlocks.erase(NewBlockIndex);
				}
			}
			if (NewBlockFile->IsOpen())
			{
				ZEN_DEBUG("{}Dropping incomplete cas block store file '{}'", LogPrefix, NewBlockFile->GetPath());
				NewBlockFile->MarkAsDeleteOnClose();
			}
		}
	});

	auto ReportChanges = [&]() -> bool {
		bool Continue = true;
		if (!MovedChunks.empty() || RemovedSize > 0)
		{
			Continue = ChangeCallback(MovedChunks, RemovedSize > AddedSize ? RemovedSize - AddedSize : 0);
			DeletedSize += RemovedSize;
			RemovedSize = 0;
			AddedSize	= 0;
			MovedCount += MovedChunks.size();
			MovedChunks.clear();
		}
		return Continue;
	};

	std::vector<uint32_t> RemovedBlocks;

	CompactState.IterateBlocks([&](uint32_t								  BlockIndex,
								   const std::vector<size_t>&			  KeepChunkIndexes,
								   const std::vector<BlockStoreLocation>& ChunkLocations) -> bool {
		ZEN_TRACE_CPU("BlockStore::CompactBlock");
		Ref<BlockStoreFile> OldBlockFile;
		{
			RwLock::SharedLockScope _(m_InsertLock);
			if ((BlockIndex == m_WriteBlockIndex.load()) && m_WriteBlock)
			{
				ZEN_ERROR("{}compact Block was requested to rewrite the currently active write block in '{}', Block index {}",
						  LogPrefix,
						  m_BlocksBasePath,
						  BlockIndex);
				return false;
			}
			auto It = m_ChunkBlocks.find(BlockIndex);
			if (It == m_ChunkBlocks.end())
			{
				ZEN_WARN("{}compact Block was requested to rewrite an unknown block in '{}', Block index {}",
						 LogPrefix,
						 m_BlocksBasePath,
						 BlockIndex);
				return true;
			}
			if (!It->second)
			{
				ZEN_WARN("{}compact Block was requested to rewrite a deleted block in '{}', Block index {}",
						 LogPrefix,
						 m_BlocksBasePath,
						 BlockIndex);
				return true;
			}
			OldBlockFile = It->second;
		}
		ZEN_ASSERT(OldBlockFile);

		uint64_t OldBlockSize = OldBlockFile->FileSize();

		if (KeepChunkIndexes.empty())
		{
			ZEN_INFO("{}dropped all chunks from '{}', freeing {}",
					 LogPrefix,
					 GetBlockPath(m_BlocksBasePath, BlockIndex).filename(),
					 NiceBytes(OldBlockSize));
		}
		else
		{
			std::vector<size_t> SortedChunkIndexes(KeepChunkIndexes);
			std::sort(SortedChunkIndexes.begin(), SortedChunkIndexes.end(), [&ChunkLocations](size_t Lhs, size_t Rhs) {
				return ChunkLocations[Lhs].Offset < ChunkLocations[Rhs].Offset;
			});
			BasicFileBuffer SourceFileBuffer(OldBlockFile->GetBasicFile(), Min(65536u, OldBlockSize));

			uint64_t			 WriteOffset		 = m_MaxBlockSize + 1u;	 // Force detect a new block
			uint64_t			 WrittenBytesToBlock = 0;
			uint64_t			 MovedFromBlock		 = 0;
			std::vector<uint8_t> Chunk;
			for (const size_t& ChunkIndex : SortedChunkIndexes)
			{
				const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex];
				if (ChunkLocation.Offset + ChunkLocation.Size > OldBlockSize)
				{
					ZEN_WARN(
						"{}compact Block skipping chunk outside of block range in '{}', Chunk start {}, Chunk size {} in Block {}, Block "
						"size {}",
						LogPrefix,
						m_BlocksBasePath,
						ChunkLocation.Offset,
						ChunkLocation.Size,
						OldBlockFile->GetPath(),
						OldBlockSize);
					continue;
				}

				Chunk.resize(ChunkLocation.Size);
				SourceFileBuffer.Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);

				if ((WriteOffset + Chunk.size()) > m_MaxBlockSize)
				{
					TargetFileBuffer.reset();
					if (NewBlockFile)
					{
						ZEN_ASSERT_SLOW(NewBlockFile->IsOpen());
						NewBlockFile->Flush();
						uint64_t NewBlockSize = NewBlockFile->FileSize();
						MovedSize += NewBlockSize;
						NewBlockFile = nullptr;

						ZEN_ASSERT(!MovedChunks.empty() ||
								   (RemovedSize > 0 || OldBlockSize == 0));	 // We should not have a new block if we haven't moved anything

						ZEN_INFO("{}wrote block {} ({})",
								 LogPrefix,
								 GetBlockPath(m_BlocksBasePath, NewBlockIndex).filename(),
								 NiceBytes(NewBlockSize));

						if (!ReportChanges())
						{
							return false;
						}
					}

					uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed);
					{
						RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
						std::filesystem::path	   NewBlockPath;
						NextBlockIndex = GetFreeBlockIndex(NextBlockIndex, InsertLock, NewBlockPath);
						if (NextBlockIndex == (uint32_t)m_MaxBlockCount)
						{
							ZEN_ERROR("{}unable to allocate a new block in '{}', count limit {} exeeded",
									  LogPrefix,
									  m_BlocksBasePath,
									  static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
							return false;
						}

						NewBlockFile				  = new BlockStoreFile(NewBlockPath);
						m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
					}

					std::error_code Error;
					DiskSpace		Space = DiskSpaceInfo(m_BlocksBasePath, Error);
					if (Error)
					{
						ZEN_ERROR("{}get disk space in '{}' FAILED, reason: '{}'", LogPrefix, m_BlocksBasePath, Error.message());
						{
							RwLock::ExclusiveLockScope _l(m_InsertLock);
							ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile);
							m_ChunkBlocks.erase(NextBlockIndex);
						}
						ZEN_ASSERT_SLOW(!NewBlockFile->IsOpen());
						NewBlockFile = nullptr;
						return false;
					}

					if (Space.Free < m_MaxBlockSize)
					{
						uint64_t ReclaimedSpace = DiskReserveCallback();
						if (Space.Free + ReclaimedSpace < m_MaxBlockSize)
						{
							ZEN_WARN("{}garbage collect for '{}' FAILED, required disk space {}, free {}",
									 LogPrefix,
									 m_BlocksBasePath,
									 m_MaxBlockSize,
									 NiceBytes(Space.Free + ReclaimedSpace));
							{
								RwLock::ExclusiveLockScope _l(m_InsertLock);
								ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile);
								m_ChunkBlocks.erase(NextBlockIndex);
							}
							ZEN_ASSERT_SLOW(!NewBlockFile->IsOpen());
							NewBlockFile = nullptr;
							return false;
						}

						ZEN_INFO("{}using gc reserve for '{}', reclaimed {}, disk free {}",
								 LogPrefix,
								 m_BlocksBasePath,
								 ReclaimedSpace,
								 NiceBytes(Space.Free + ReclaimedSpace));
					}
					NewBlockFile->Create(m_MaxBlockSize);
					NewBlockIndex = NextBlockIndex;
					WriteOffset	  = 0;
					AddedSize += WrittenBytesToBlock;
					WrittenBytesToBlock = 0;
					TargetFileBuffer	= std::make_unique<BasicFileWriter>(NewBlockFile->GetBasicFile(), Min(65536u, m_MaxBlockSize));
				}

				TargetFileBuffer->Write(Chunk.data(), ChunkLocation.Size, WriteOffset);
				MovedChunks.push_back(
					{ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = gsl::narrow<uint32_t>(WriteOffset), .Size = ChunkLocation.Size}});
				WrittenBytesToBlock = WriteOffset + ChunkLocation.Size;

				MovedFromBlock += RoundUp(ChunkLocation.Offset + ChunkLocation.Size, PayloadAlignment) - ChunkLocation.Offset;
				WriteOffset = RoundUp(WriteOffset + ChunkLocation.Size, PayloadAlignment);
			}
			AddedSize += WrittenBytesToBlock;
			ZEN_INFO("{}moved {} chunks ({}) from '{}' to new block, freeing {}",
					 LogPrefix,
					 KeepChunkIndexes.size(),
					 NiceBytes(MovedFromBlock),
					 GetBlockPath(m_BlocksBasePath, BlockIndex).filename(),
					 NiceBytes(OldBlockSize - MovedFromBlock));
		}
		if (TargetFileBuffer)
		{
			TargetFileBuffer->Flush();
		}

		if (!ReportChanges())
		{
			return false;
		}

		{
			RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
			ZEN_DEBUG("{}marking cas block store file '{}' for delete ({})",
					  LogPrefix,
					  OldBlockFile->GetPath().filename(),
					  NiceBytes(OldBlockSize));
			OldBlockFile->MarkAsDeleteOnClose();
			m_ChunkBlocks.erase(BlockIndex);
			m_TotalSize.fetch_sub(OldBlockSize);
			RemovedSize += OldBlockSize;
		}
		return true;
	});

	if (NewBlockFile)
	{
		ZEN_ASSERT_SLOW(NewBlockFile->IsOpen());
		NewBlockFile->Flush();
		uint64_t NewBlockSize = NewBlockFile->FileSize();
		MovedSize += NewBlockSize;
		NewBlockFile = nullptr;
		ZEN_INFO("{}wrote block {} ({})", LogPrefix, GetBlockPath(m_BlocksBasePath, NewBlockIndex).filename(), NiceBytes(NewBlockSize));
	}

	ReportChanges();
}

const char*
BlockStore::GetBlockFileExtension()
{
	return ".ucas";
}

std::filesystem::path
BlockStore::GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex)
{
	ExtendablePathBuilder<256> Path;

	char BlockHexString[9];
	ToHexNumber(BlockIndex, BlockHexString);

	Path.Append(BlocksBasePath);
	Path.AppendSeparator();
	Path.AppendAsciiRange(BlockHexString, BlockHexString + 4);
	Path.AppendSeparator();
	Path.Append(BlockHexString);
	Path.Append(GetBlockFileExtension());
	return Path.ToPath();
}

#if ZEN_WITH_TESTS

TEST_CASE("blockstore.blockstoredisklocation")
{
	BlockStoreLocation Zero = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = 0};
	CHECK(Zero == BlockStoreDiskLocation(Zero, 4).Get(4));

	BlockStoreLocation MaxBlockIndex = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = 0, .Size = 0};
	CHECK(MaxBlockIndex == BlockStoreDiskLocation(MaxBlockIndex, 4).Get(4));

	BlockStoreLocation MaxOffset = BlockStoreLocation{.BlockIndex = 0, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = 0};
	CHECK(MaxOffset == BlockStoreDiskLocation(MaxOffset, 4).Get(4));

	BlockStoreLocation MaxSize = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = std::numeric_limits<uint32_t>::max()};
	CHECK(MaxSize == BlockStoreDiskLocation(MaxSize, 4).Get(4));

	BlockStoreLocation MaxBlockIndexAndOffset =
		BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = 0};
	CHECK(MaxBlockIndexAndOffset == BlockStoreDiskLocation(MaxBlockIndexAndOffset, 4).Get(4));

	BlockStoreLocation MaxAll = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex,
												   .Offset	   = BlockStoreDiskLocation::MaxOffset * 4,
												   .Size	   = std::numeric_limits<uint32_t>::max()};
	CHECK(MaxAll == BlockStoreDiskLocation(MaxAll, 4).Get(4));

	BlockStoreLocation MaxAll4096 = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex,
													   .Offset	   = BlockStoreDiskLocation::MaxOffset * 4096,
													   .Size	   = std::numeric_limits<uint32_t>::max()};
	CHECK(MaxAll4096 == BlockStoreDiskLocation(MaxAll4096, 4096).Get(4096));

	BlockStoreLocation Middle = BlockStoreLocation{.BlockIndex = (BlockStoreDiskLocation::MaxBlockIndex) / 2,
												   .Offset	   = ((BlockStoreDiskLocation::MaxOffset) / 2) * 4,
												   .Size	   = std::numeric_limits<uint32_t>::max() / 2};
	CHECK(Middle == BlockStoreDiskLocation(Middle, 4).Get(4));
}

TEST_CASE("blockstore.blockfile")
{
	ScopedTemporaryDirectory TempDir;
	auto					 RootDirectory = TempDir.Path() / "blocks";
	CreateDirectories(RootDirectory);

	{
		BlockStoreFile File1(RootDirectory / "1");
		File1.Create(16384);
		CHECK(File1.FileSize() == 0);
		File1.Write("data", 5, 0);
		IoBuffer DataChunk = File1.GetChunk(0, 5);
		File1.Write("boop", 5, 5);
		IoBuffer	BoopChunk = File1.GetChunk(5, 5);
		const char* Data	  = static_cast<const char*>(DataChunk.GetData());
		CHECK(std::string(Data) == "data");
		const char* Boop = static_cast<const char*>(BoopChunk.GetData());
		CHECK(std::string(Boop) == "boop");
		File1.Flush();
		CHECK(File1.FileSize() == 10);
	}
	{
		BlockStoreFile File1(RootDirectory / "1");
		File1.Open();

		char DataRaw[5];
		File1.Read(DataRaw, 5, 0);
		CHECK(std::string(DataRaw) == "data");
		IoBuffer DataChunk = File1.GetChunk(0, 5);

		char BoopRaw[5];
		File1.Read(BoopRaw, 5, 5);
		CHECK(std::string(BoopRaw) == "boop");

		IoBuffer	BoopChunk = File1.GetChunk(5, 5);
		const char* Data	  = static_cast<const char*>(DataChunk.GetData());
		CHECK(std::string(Data) == "data");
		const char* Boop = static_cast<const char*>(BoopChunk.GetData());
		CHECK(std::string(Boop) == "boop");
	}

	{
		IoBuffer DataChunk;
		IoBuffer BoopChunk;

		{
			BlockStoreFile File1(RootDirectory / "1");
			File1.Open();
			DataChunk = File1.GetChunk(0, 5);
			BoopChunk = File1.GetChunk(5, 5);
		}

		CHECK(std::filesystem::exists(RootDirectory / "1"));

		const char* Data = static_cast<const char*>(DataChunk.GetData());
		CHECK(std::string(Data) == "data");
		const char* Boop = static_cast<const char*>(BoopChunk.GetData());
		CHECK(std::string(Boop) == "boop");
	}
	CHECK(std::filesystem::exists(RootDirectory / "1"));

	{
		IoBuffer DataChunk;
		IoBuffer BoopChunk;

		{
			BlockStoreFile File1(RootDirectory / "1");
			File1.Open();
			File1.MarkAsDeleteOnClose();
			DataChunk = File1.GetChunk(0, 5);
			BoopChunk = File1.GetChunk(5, 5);
		}

		const char* Data = static_cast<const char*>(DataChunk.GetData());
		CHECK(std::string(Data) == "data");
		const char* Boop = static_cast<const char*>(BoopChunk.GetData());
		CHECK(std::string(Boop) == "boop");
	}
	CHECK(!std::filesystem::exists(RootDirectory / "1"));
}

namespace blockstore::impl {
	BlockStoreLocation WriteStringAsChunk(BlockStore& Store, std::string_view String, uint32_t PayloadAlignment)
	{
		BlockStoreLocation Location;
		Store.WriteChunk(String.data(), String.length(), PayloadAlignment, [&](const BlockStoreLocation& L) { Location = L; });
		CHECK(Location.Size == String.length());
		return Location;
	};

	std::string ReadChunkAsString(BlockStore& Store, const BlockStoreLocation& Location)
	{
		IoBuffer ChunkData = Store.TryGetChunk(Location);
		if (!ChunkData)
		{
			return "";
		}
		std::string AsString((const char*)ChunkData.Data(), ChunkData.Size());
		return AsString;
	};

	std::vector<std::filesystem::path> GetDirectoryContent(std::filesystem::path RootDir, bool Files, bool Directories)
	{
		DirectoryContent DirectoryContent;
		GetDirectoryContent(RootDir,
							DirectoryContent::RecursiveFlag | (Files ? DirectoryContent::IncludeFilesFlag : 0) |
								(Directories ? DirectoryContent::IncludeDirsFlag : 0),
							DirectoryContent);
		std::vector<std::filesystem::path> Result;
		Result.insert(Result.end(), DirectoryContent.Directories.begin(), DirectoryContent.Directories.end());
		Result.insert(Result.end(), DirectoryContent.Files.begin(), DirectoryContent.Files.end());
		return Result;
	};

}  // namespace blockstore::impl

TEST_CASE("blockstore.multichunks")
{
	using namespace blockstore::impl;

	ScopedTemporaryDirectory TempDir;
	auto					 RootDirectory = TempDir.Path();

	BlockStore Store;
	Store.Initialize(RootDirectory, 128, 1024);

	std::vector<IoBuffer> MultiChunkData;
	std::string			  FirstChunkData = "0123456789012345678901234567890123456789012345678901234567890123";
	MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, FirstChunkData.data(), FirstChunkData.size()));

	std::string SecondChunkData = "12345678901234567890123456789012345678901234567890123456";
	MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, SecondChunkData.data(), SecondChunkData.size()));

	std::string ThirdChunkData = "789012345678901234567890123456789012345678901234567890";
	MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, ThirdChunkData.data(), ThirdChunkData.size()));

	BlockStoreLocation Locations[5];
	size_t			   ChunkOffset = 0;
	Store.WriteChunks(MultiChunkData, 4, [&](std::span<BlockStoreLocation> InLocations) {
		for (const BlockStoreLocation& Location : InLocations)
		{
			Locations[ChunkOffset++] = Location;
		}
		CHECK(ChunkOffset <= MultiChunkData.size());
	});
	CHECK(ChunkOffset == 3);

	CHECK(ReadChunkAsString(Store, Locations[0]) == FirstChunkData);
	CHECK(Locations[1].BlockIndex == Locations[0].BlockIndex);
	CHECK(ReadChunkAsString(Store, Locations[1]) == SecondChunkData);
	CHECK(Locations[2].BlockIndex != Locations[1].BlockIndex);
	CHECK(ReadChunkAsString(Store, Locations[2]) == ThirdChunkData);

	MultiChunkData.resize(0);
	std::string FourthChunkData = "ABCDEFGHIJABCDEFGHIJ_23";
	MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, FourthChunkData.data(), FourthChunkData.size()));

	std::string FifthChunkData =
		"ABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJ_93";
	MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, FifthChunkData.data(), FifthChunkData.size()));

	Store.WriteChunks(MultiChunkData, 4, [&](std::span<BlockStoreLocation> InLocations) {
		for (const BlockStoreLocation& Location : InLocations)
		{
			CHECK(ChunkOffset < 5);
			Locations[ChunkOffset++] = Location;
		}
	});
	CHECK(ChunkOffset == 5);

	CHECK(ReadChunkAsString(Store, Locations[0]) == FirstChunkData);
	CHECK(Locations[1].BlockIndex == Locations[0].BlockIndex);
	CHECK(ReadChunkAsString(Store, Locations[1]) == SecondChunkData);
	CHECK(Locations[2].BlockIndex != Locations[1].BlockIndex);
	CHECK(ReadChunkAsString(Store, Locations[2]) == ThirdChunkData);

	CHECK(Locations[3].BlockIndex == Locations[2].BlockIndex);
	CHECK(ReadChunkAsString(Store, Locations[3]) == FourthChunkData);
	CHECK(Locations[4].BlockIndex != Locations[3].BlockIndex);
	CHECK(ReadChunkAsString(Store, Locations[4]) == FifthChunkData);
}

TEST_CASE("blockstore.chunks")
{
	using namespace blockstore::impl;

	ScopedTemporaryDirectory TempDir;
	auto					 RootDirectory = TempDir.Path();

	BlockStore Store;
	Store.Initialize(RootDirectory, 128, 1024);
	IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512});
	CHECK(!BadChunk);

	std::string		   FirstChunkData	   = "This is the data of the first chunk that we will write";
	BlockStoreLocation FirstChunkLocation  = WriteStringAsChunk(Store, FirstChunkData, 4);
	std::string		   SecondChunkData	   = "This is the data for the second chunk that we will write";
	BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4);

	CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData);
	CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData);

	std::string ThirdChunkData =
		"This is a much longer string that will not fit in the first block so it should be placed in the second block";
	BlockStoreLocation ThirdChunkLocation = WriteStringAsChunk(Store, ThirdChunkData, 4);
	CHECK(ThirdChunkLocation.BlockIndex != FirstChunkLocation.BlockIndex);

	CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData);
	CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData);
	CHECK(ReadChunkAsString(Store, ThirdChunkLocation) == ThirdChunkData);
}

TEST_CASE("blockstore.clean.stray.blocks")
{
	using namespace blockstore::impl;

	ScopedTemporaryDirectory TempDir;
	auto					 RootDirectory = TempDir.Path();

	BlockStore Store;
	Store.Initialize(RootDirectory / "store", 128, 1024);

	std::string		   FirstChunkData	   = "This is the data of the first chunk that we will write";
	BlockStoreLocation FirstChunkLocation  = WriteStringAsChunk(Store, FirstChunkData, 4);
	std::string		   SecondChunkData	   = "This is the data for the second chunk that we will write";
	BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4);
	std::string		   ThirdChunkData =
		"This is a much longer string that will not fit in the first block so it should be placed in the second block";
	BlockStoreLocation ThirdChunkLocation = WriteStringAsChunk(Store, ThirdChunkData, 4);

	Store.Close();

	Store.Initialize(RootDirectory / "store", 128, 1024);
	CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 2);
	IoBuffer ThirdChunk = Store.TryGetChunk(ThirdChunkLocation);
	CHECK(ThirdChunk);

	// Reclaim space should delete unreferenced block
	Store.ReclaimSpace(Store.GetReclaimSnapshotState(), {FirstChunkLocation, SecondChunkLocation}, {0, 1}, 4, false);
	// Block lives on as long as we reference it via ThirdChunk
	CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 2);
	ThirdChunk = {};
	CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 1);
	ThirdChunk = Store.TryGetChunk(ThirdChunkLocation);
	CHECK(!ThirdChunk);

	// Recreate a fake block for a missing chunk location
	BlockStore::BlockIndexSet KnownBlocks;
	KnownBlocks.Add(FirstChunkLocation.BlockIndex);
	KnownBlocks.Add(SecondChunkLocation.BlockIndex);
	KnownBlocks.Add(ThirdChunkLocation.BlockIndex);
	Store.SyncExistingBlocksOnDisk(KnownBlocks);

	// We create a fake block for the location - we should still not be able to get the chunk
	CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 2);
	ThirdChunk = Store.TryGetChunk(ThirdChunkLocation);
	CHECK(!ThirdChunk);
}

TEST_CASE("blockstore.flush.force.new.block")
{
	using namespace blockstore::impl;

	ScopedTemporaryDirectory TempDir;
	auto					 RootDirectory = TempDir.Path();

	BlockStore Store;
	Store.Initialize(RootDirectory / "store", 128, 1024);

	std::string FirstChunkData = "This is the data of the first chunk that we will write";
	WriteStringAsChunk(Store, FirstChunkData, 4);
	Store.Flush(/*ForceNewBlock*/ true);
	std::string SecondChunkData = "This is the data for the second chunk that we will write";
	WriteStringAsChunk(Store, SecondChunkData, 4);
	Store.Flush(/*ForceNewBlock*/ true);
	std::string ThirdChunkData =
		"This is a much longer string that will not fit in the first block so it should be placed in the second block";
	WriteStringAsChunk(Store, ThirdChunkData, 4);

	CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 3);
}

TEST_CASE("blockstore.iterate.chunks")
{
	using namespace blockstore::impl;

	ScopedTemporaryDirectory TempDir;
	auto					 RootDirectory = TempDir.Path();

	BlockStore Store;
	Store.Initialize(RootDirectory / "store", DefaultIterateSmallChunkWindowSize * 2, 1024);
	IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512});
	CHECK(!BadChunk);

	std::string		   FirstChunkData	  = "This is the data of the first chunk that we will write";
	BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4);

	std::string		   SecondChunkData	   = "This is the data for the second chunk that we will write";
	BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4);
	Store.Flush(/*ForceNewBlock*/ false);

	std::string		   VeryLargeChunk(DefaultIterateSmallChunkWindowSize * 2, 'L');
	BlockStoreLocation VeryLargeChunkLocation = WriteStringAsChunk(Store, VeryLargeChunk, 4);

	BlockStoreLocation BadLocationZeroSize	 = {.BlockIndex = 0, .Offset = 0, .Size = 0};
	BlockStoreLocation BadLocationOutOfRange = {.BlockIndex = 0,
												.Offset		= DefaultIterateSmallChunkWindowSize,
												.Size		= DefaultIterateSmallChunkWindowSize * 2};
	BlockStoreLocation BadBlockIndex		 = {.BlockIndex = 0xfffff, .Offset = 1024, .Size = 1024};

	WorkerThreadPool WorkerPool(4);

	std::vector<BlockStoreLocation> Locations{FirstChunkLocation,
											  SecondChunkLocation,
											  VeryLargeChunkLocation,
											  BadLocationZeroSize,
											  BadLocationOutOfRange,
											  BadBlockIndex};
	Latch							WorkLatch(1);
	Store.IterateChunks(Locations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool {
		WorkLatch.AddCount(1);
		WorkerPool.ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() {
			auto _		  = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
			bool Continue = Store.IterateBlock(
				Locations,
				ChunkIndexes,
				[&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool {
					switch (ChunkIndex)
					{
						case 0:
							CHECK(Data);
							CHECK(Size == FirstChunkData.size());
							CHECK(std::string((const char*)Data, Size) == FirstChunkData);
							break;
						case 1:
							CHECK(Data);
							CHECK(Size == SecondChunkData.size());
							CHECK(std::string((const char*)Data, Size) == SecondChunkData);
							break;
						case 2:
							CHECK(false);
							break;
						case 3:
							CHECK(!Data);
							break;
						case 4:
							CHECK(!Data);
							break;
						case 5:
							CHECK(!Data);
							break;
						default:
							CHECK(false);
							break;
					}
					return true;
				},
				[&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool {
					switch (ChunkIndex)
					{
						case 0:
						case 1:
							CHECK(false);
							break;
						case 2:
							{
								CHECK(Size == VeryLargeChunk.size());
								char*  Buffer	  = new char[Size];
								size_t HashOffset = 0;
								File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) {
									memcpy(&Buffer[HashOffset], Data, Size);
									HashOffset += Size;
								});
								CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0);
								delete[] Buffer;
							}
							break;
						case 3:
							CHECK(false);
							break;
						case 4:
							CHECK(false);
							break;
						case 5:
							CHECK(false);
							break;
						default:
							CHECK(false);
							break;
					}
					return true;
				});
			CHECK(Continue);
		});
		return true;
	});
	WorkLatch.CountDown();
	WorkLatch.Wait();
}

TEST_CASE("blockstore.reclaim.space")
{
	using namespace blockstore::impl;

	ScopedTemporaryDirectory TempDir;
	auto					 RootDirectory = TempDir.Path();

	BlockStore Store;
	Store.Initialize(RootDirectory / "store", 512, 1024);

	constexpr size_t				ChunkCount = 200;
	constexpr size_t				Alignment  = 8;
	std::vector<BlockStoreLocation> ChunkLocations;
	std::vector<IoHash>				ChunkHashes;
	ChunkLocations.reserve(ChunkCount);
	ChunkHashes.reserve(ChunkCount);
	for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
	{
		IoBuffer Chunk = CreateRandomBlob(57 + ChunkIndex);

		Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations.push_back(L); });
		ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
	}

	std::vector<size_t> ChunksToKeep;
	ChunksToKeep.reserve(ChunkLocations.size());
	for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
	{
		ChunksToKeep.push_back(ChunkIndex);
	}

	Store.Flush(/*ForceNewBlock*/ false);
	BlockStore::ReclaimSnapshotState State1 = Store.GetReclaimSnapshotState();
	Store.ReclaimSpace(State1, ChunkLocations, ChunksToKeep, Alignment, true);

	// If we keep all the chunks we should not get any callbacks on moved/deleted stuff
	Store.ReclaimSpace(
		State1,
		ChunkLocations,
		ChunksToKeep,
		Alignment,
		false,
		[](const BlockStore::MovedChunksArray&, const BlockStore::ChunkIndexArray&) { CHECK(false); },
		[]() {
			CHECK(false);
			return 0;
		});

	size_t DeleteChunkCount = 38;
	ChunksToKeep.clear();
	for (size_t ChunkIndex = DeleteChunkCount; ChunkIndex < ChunkCount; ++ChunkIndex)
	{
		ChunksToKeep.push_back(ChunkIndex);
	}

	std::vector<BlockStoreLocation> NewChunkLocations = ChunkLocations;
	size_t							MovedChunkCount	  = 0;
	size_t							DeletedChunkCount = 0;
	Store.ReclaimSpace(
		State1,
		ChunkLocations,
		ChunksToKeep,
		Alignment,
		false,
		[&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) {
			for (const auto& MovedChunk : MovedChunks)
			{
				CHECK(MovedChunk.first >= DeleteChunkCount);
				NewChunkLocations[MovedChunk.first] = MovedChunk.second;
			}
			MovedChunkCount += MovedChunks.size();
			for (size_t DeletedIndex : DeletedChunks)
			{
				CHECK(DeletedIndex < DeleteChunkCount);
			}
			DeletedChunkCount += DeletedChunks.size();
		},
		[]() {
			CHECK(false);
			return 0;
		});
	CHECK(MovedChunkCount <= DeleteChunkCount);
	CHECK(DeletedChunkCount == DeleteChunkCount);
	ChunkLocations = std::vector<BlockStoreLocation>(NewChunkLocations.begin() + DeleteChunkCount, NewChunkLocations.end());

	for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
	{
		IoBuffer ChunkBlock = Store.TryGetChunk(NewChunkLocations[ChunkIndex]);
		if (ChunkIndex >= DeleteChunkCount)
		{
			IoBuffer VerifyChunk = Store.TryGetChunk(NewChunkLocations[ChunkIndex]);
			CHECK(VerifyChunk);
			IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
			CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
		}
	}

	// We need to take a new state since reclaim space add new block when compacting
	BlockStore::ReclaimSnapshotState State2 = Store.GetReclaimSnapshotState();
	NewChunkLocations						= ChunkLocations;
	MovedChunkCount							= 0;
	DeletedChunkCount						= 0;
	Store.ReclaimSpace(
		State2,
		ChunkLocations,
		{},
		Alignment,
		false,
		[&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) {
			CHECK(MovedChunks.empty());
			DeletedChunkCount += DeletedChunks.size();
		},
		[]() {
			CHECK(false);
			return 0;
		});
	CHECK(DeletedChunkCount == ChunkCount - DeleteChunkCount);
}

TEST_CASE("blockstore.thread.read.write")
{
	using namespace blockstore::impl;

	ScopedTemporaryDirectory TempDir;
	auto					 RootDirectory = TempDir.Path();

	BlockStore Store;
	Store.Initialize(RootDirectory / "store", 1088, 1024);

	constexpr size_t	  ChunkCount = 1000;
	constexpr size_t	  Alignment	 = 8;
	std::vector<IoBuffer> Chunks;
	std::vector<IoHash>	  ChunkHashes;
	Chunks.reserve(ChunkCount);
	ChunkHashes.reserve(ChunkCount);
	for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
	{
		IoBuffer Chunk = CreateRandomBlob(57 + ChunkIndex / 2);
		Chunks.push_back(Chunk);
		ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
	}

	std::vector<BlockStoreLocation> ChunkLocations;
	ChunkLocations.resize(ChunkCount);

	WorkerThreadPool	WorkerPool(8);
	std::atomic<size_t> WorkCompleted = 0;
	for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
	{
		WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() {
			IoBuffer& Chunk = Chunks[ChunkIndex];
			Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; });
			WorkCompleted.fetch_add(1);
		});
	}
	while (WorkCompleted < Chunks.size())
	{
		Sleep(1);
	}

	WorkCompleted = 0;
	for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
	{
		WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
			IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
			CHECK(VerifyChunk);
			IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
			CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
			WorkCompleted.fetch_add(1);
		});
	}
	while (WorkCompleted < Chunks.size())
	{
		Sleep(1);
	}

	std::vector<BlockStoreLocation> SecondChunkLocations;
	SecondChunkLocations.resize(ChunkCount);
	WorkCompleted = 0;
	for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
	{
		WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() {
			IoBuffer& Chunk = Chunks[ChunkIndex];
			Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) {
				SecondChunkLocations[ChunkIndex] = L;
			});
			WorkCompleted.fetch_add(1);
		});
		WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
			IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
			CHECK(VerifyChunk);
			IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
			CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
			WorkCompleted.fetch_add(1);
		});
	}
	while (WorkCompleted < Chunks.size() * 2)
	{
		Sleep(1);
	}
}

TEST_CASE("blockstore.compact.blocks")
{
	using namespace blockstore::impl;

	ScopedTemporaryDirectory TempDir;
	auto					 RootDirectory = TempDir.Path();

	BlockStore Store;
	Store.Initialize(RootDirectory / "store", 1088, 1024);

	constexpr size_t	  ChunkCount = 200;
	constexpr size_t	  Alignment	 = 8;
	std::vector<IoBuffer> Chunks;
	std::vector<IoHash>	  ChunkHashes;
	Chunks.reserve(ChunkCount);
	ChunkHashes.reserve(ChunkCount);
	for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
	{
		IoBuffer Chunk = CreateRandomBlob(57 + ChunkIndex / 2);
		Chunks.push_back(Chunk);
		ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
	}

	std::vector<BlockStoreLocation> ChunkLocations;
	ChunkLocations.resize(ChunkCount);

	for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
	{
		IoBuffer& Chunk = Chunks[ChunkIndex];
		Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; });
	}

	SUBCASE("touch nothing")
	{
		uint64_t PreSize = Store.TotalSize();
		CHECK(PreSize > 0);
		BlockStoreCompactState State;
		Store.CompactBlocks(
			State,
			Alignment,
			[&](const BlockStore::MovedChunksArray&, uint64_t) {
				CHECK(false);
				return true;
			},
			[]() {
				CHECK(false);
				return 0;
			});
		CHECK_EQ(PreSize, Store.TotalSize());
	}
	SUBCASE("keep nothing")
	{
		Store.Flush(true);

		uint64_t PreSize = Store.TotalSize();
		CHECK(PreSize > 0);
		BlockStoreCompactState State;
		for (const BlockStoreLocation& Location : ChunkLocations)
		{
			State.IncludeBlock(Location.BlockIndex);
		}
		uint64_t RemovedSize = 0;
		Store.CompactBlocks(
			State,
			Alignment,
			[&](const BlockStore::MovedChunksArray& Moved, uint64_t Removed) {
				RemovedSize += Removed;
				CHECK(Moved.empty());
				return true;
			},
			[]() { return 0; });
		CHECK_EQ(RemovedSize, PreSize);
		CHECK_EQ(0u, Store.TotalSize());
	}
	SUBCASE("keep current write block")
	{
		uint64_t						 PreSize = Store.TotalSize();
		BlockStoreCompactState			 State;
		BlockStore::ReclaimSnapshotState SnapshotState = Store.GetReclaimSnapshotState();
		for (const BlockStoreLocation& Location : ChunkLocations)
		{
			if (SnapshotState.m_ActiveWriteBlocks.contains(Location.BlockIndex))
			{
				continue;
			}
			State.IncludeBlock(Location.BlockIndex);
		}
		uint64_t RemovedSize = 0;
		Store.CompactBlocks(
			State,
			Alignment,
			[&](const BlockStore::MovedChunksArray& Moved, uint64_t Removed) {
				RemovedSize += Removed;
				CHECK(Moved.empty());
				return true;
			},
			[]() { return 0; });
		CHECK_EQ(Store.TotalSize() + RemovedSize, PreSize);
		CHECK_LE(Store.TotalSize(), 1088);
		CHECK_GT(Store.TotalSize(), 0);
	}
	SUBCASE("keep everthing")
	{
		Store.Flush(true);

		uint64_t						 PreSize = Store.TotalSize();
		BlockStoreCompactState			 State;
		BlockStore::ReclaimSnapshotState SnapshotState = Store.GetReclaimSnapshotState();
		for (const BlockStoreLocation& Location : ChunkLocations)
		{
			State.AddKeepLocation(Location);
		}
		Store.CompactBlocks(
			State,
			Alignment,
			[&](const BlockStore::MovedChunksArray&, uint64_t) {
				CHECK(false);
				return true;
			},
			[]() {
				CHECK(false);
				return 0;
			});
		CHECK_EQ(Store.TotalSize(), PreSize);
	}
	SUBCASE("drop first block")
	{
		uint64_t						 PreSize = Store.TotalSize();
		BlockStoreCompactState			 State;
		BlockStore::ReclaimSnapshotState SnapshotState = Store.GetReclaimSnapshotState();

		CHECK(!SnapshotState.m_ActiveWriteBlocks.contains(0));
		State.IncludeBlock(0);

		uint64_t FirstBlockSize = 0;
		for (const BlockStoreLocation& Location : ChunkLocations)
		{
			if (Location.BlockIndex == 0)
			{
				FirstBlockSize = Max<uint64_t>(FirstBlockSize, Location.Offset + Location.Size);
			}
		}

		uint64_t RemovedSize = 0;
		Store.CompactBlocks(
			State,
			Alignment,
			[&](const BlockStore::MovedChunksArray& Moved, uint64_t Removed) {
				CHECK(Moved.empty());
				RemovedSize += Removed;
				return true;
			},
			[]() {
				CHECK(false);
				return 0;
			});
		CHECK_EQ(FirstBlockSize, RemovedSize);
		CHECK_EQ(Store.TotalSize(), PreSize - FirstBlockSize);
	}
	SUBCASE("compact first block")
	{
		uint64_t						 PreSize = Store.TotalSize();
		BlockStoreCompactState			 State;
		BlockStore::ReclaimSnapshotState SnapshotState = Store.GetReclaimSnapshotState();

		CHECK(!SnapshotState.m_ActiveWriteBlocks.contains(0));
		State.IncludeBlock(0);

		uint64_t						SkipChunkCount = 2;
		std::vector<BlockStoreLocation> DroppedLocations(ChunkLocations.begin(), ChunkLocations.begin() + 2);
		for (auto It = ChunkLocations.begin() + 2; It != ChunkLocations.end(); It++)
		{
			const BlockStoreLocation& Location = *It;
			if (Location.BlockIndex != 0)
			{
				continue;
			}
			State.AddKeepLocation(Location);
		}
		uint64_t RemovedSize = 0;
		Store.CompactBlocks(
			State,
			Alignment,
			[&](const BlockStore::MovedChunksArray& Moved, uint64_t Removed) {
				for (const auto& Move : Moved)
				{
					const BlockStoreLocation& OldLocation = State.GetLocation(Move.first);
					CHECK(OldLocation.BlockIndex == 0);	 // Only move from block 0
					CHECK(std::find(DroppedLocations.begin(), DroppedLocations.end(), OldLocation) == DroppedLocations.end());
					auto It = std::find(ChunkLocations.begin(), ChunkLocations.end(), OldLocation);
					CHECK(It != ChunkLocations.end());
					(*It) = Move.second;
				}
				RemovedSize += Removed;
				return true;
			},
			[]() {
				CHECK(false);
				return 0;
			});

		SkipChunkCount = 2;

		for (size_t Index = 0; Index < ChunkLocations.size(); Index++)
		{
			const BlockStoreLocation& Location = ChunkLocations[Index];
			if (Location.BlockIndex == 0 && SkipChunkCount > 0)
			{
				CHECK(!Store.TryGetChunk(Location));
				continue;
			}
			IoBuffer Buffer = Store.TryGetChunk(Location);
			CHECK(Buffer);
			IoHash RawHash = IoHash::HashBuffer(Buffer.Data(), Buffer.Size());
			CHECK_EQ(ChunkHashes[Index], RawHash);
		}
		CHECK_LT(Store.TotalSize(), PreSize);
	}
	SUBCASE("compact every other item")
	{
		uint64_t						 PreSize = Store.TotalSize();
		BlockStoreCompactState			 State;
		BlockStore::ReclaimSnapshotState SnapshotState = Store.GetReclaimSnapshotState();
		bool							 SkipFlag	   = false;

		for (const BlockStoreLocation& Location : ChunkLocations)
		{
			if (SnapshotState.m_ActiveWriteBlocks.contains(Location.BlockIndex))
			{
				continue;
			}
			if (SkipFlag)
			{
				State.IncludeBlock(Location.BlockIndex);
				SkipFlag = false;
				continue;
			}
			SkipFlag = true;
		}

		SkipFlag = false;
		std::vector<BlockStoreLocation> DroppedLocations;
		for (const BlockStoreLocation& Location : ChunkLocations)
		{
			if (SnapshotState.m_ActiveWriteBlocks.contains(Location.BlockIndex))
			{
				continue;
			}
			if (SkipFlag)
			{
				DroppedLocations.push_back(Location);
				SkipFlag = false;
				continue;
			}
			State.AddKeepLocation(Location);
			SkipFlag = true;
		}
		uint64_t RemovedSize = 0;
		Store.CompactBlocks(
			State,
			Alignment,
			[&](const BlockStore::MovedChunksArray& Moved, uint64_t Removed) {
				for (const auto& Move : Moved)
				{
					const BlockStoreLocation& OldLocation = State.GetLocation(Move.first);
					CHECK(std::find(DroppedLocations.begin(), DroppedLocations.end(), OldLocation) == DroppedLocations.end());
					auto It = std::find(ChunkLocations.begin(), ChunkLocations.end(), OldLocation);
					CHECK(It != ChunkLocations.end());
					(*It) = Move.second;
				}
				RemovedSize += Removed;
				return true;
			},
			[]() {
				CHECK(false);
				return 0;
			});
		SkipFlag = false;
		for (size_t Index = 0; Index < ChunkLocations.size(); Index++)
		{
			const BlockStoreLocation& Location = ChunkLocations[Index];
			if (SkipFlag && !SnapshotState.m_ActiveWriteBlocks.contains(Location.BlockIndex))
			{
				CHECK(std::find(DroppedLocations.begin(), DroppedLocations.end(), Location) != DroppedLocations.end());
				CHECK(!Store.TryGetChunk(Location));
				SkipFlag = false;
				continue;
			}
			IoBuffer Buffer = Store.TryGetChunk(Location);
			CHECK(Buffer);
			IoHash RawHash = IoHash::HashBuffer(Buffer.Data(), Buffer.Size());
			CHECK_EQ(ChunkHashes[Index], RawHash);
			SkipFlag = true;
		}
		CHECK_LT(Store.TotalSize(), PreSize);
	}
}

#endif

void
blockstore_forcelink()
{
}

}  // namespace zen
